LXM: Better Splittable Pseudorandom Number Generators (and Almost as Fast)
En el 2021 Guy Steele y Sebastiano Vigna presentaron un generador de números pseudo aleatorios (PRNG) que se construye a partir de otro PRNG en donde colaboro Guy Steele llamado SplitMix. La principal propiedad de ambos PRNGs es que estos generadores son partibles (splittable), es decir, se pueden dividir en dos nuevos generadores estadísticamente independientes, lo cual es de gran utilidad en ambientes concurrentes.
LXM se basa en combinar tres ideas: un generador lineal congruente (la L), un generador basado en XORs (la X) y el resultado de la combinación de esos dos generadores utilizarlo como input en una función mezcladora (la M).
En este trabajo nos centraremos específicamente en la parte práctica del algoritmo y lo re-implementaremos en nuestro propio código, evitando hablar tanto de la concurrencia como de la propiedad partible del algoritmo (lo cual consiste de la mayoría del paper de Guy Steele y Sebastiano Vigna). Las ideas principales surgen de la sección 2 del paper (THE LXM GENERATION ALGORITHM)
def LXM(s, m=2891336453, a=1310709051, x0=11565345, x1=5242836, k=64):
# https://gist.github.com/trietptm/5cd60ed6add5adad6a34098ce255949a
rotl = lambda val, r_bits, max_bits: \
(val << r_bits%max_bits) & (2**max_bits-1) | \
((val & (2**max_bits-1)) >> (max_bits-(r_bits%max_bits)))
while True:
# Combining operation
z = s + x0
# Mixing function (lea64)
z = (z ^ (z >> 32)) * 0xdaba0b6eb09322e3
z = (z ^ (z >> 32)) * 0xdaba0b6eb09322e3
z = (z ^ (z >> 32))
z &= (2**k) - 1
# Update the LCG subgenerator
s = (m * s + a)
# Update the XBG subgenerator (xoroshiro128v1_0)
q0 = x0; q1 = x1
q1 ^= q0
q0 = rotl(q0, 24, k)
q0 = q0 ^ q1 ^ (q1 << 16)
q1 = rotl(q1, 37, k)
x0 = q0; x1 = q1
# Return result (modified to be in interval 0-1)
yield z/2**k
n = 5
print(f"{n} números generados al azar, en el intervalo 0-1:")
g = LXM(42)
for r, i in zip(g,range(n)):
print(f"* {r}")
5 números generados al azar, en el intervalo 0-1: * 0.793781427453092 * 0.2155520740899848 * 0.7433470653358116 * 0.28660629734277526 * 0.515362051632894
# Empezamos con un scatterplot
g = LXM(42)
n = 1000
x = []; y = []
for i in range(n):
x.append(next(g))
y.append(next(g))
plt.title(f"Dispersión de {n} números generados al azar")
plt.ylabel("X(n)-1")
plt.xlabel("X(n)")
plt.scatter(x,y,50)
<matplotlib.collections.PathCollection at 0x7fae278fd180>
En el gráfico de dispersión se pretende ver si la generación de los números sigue algún patrón. En el gráfico realizado no se observa ninguno, ya que no se ven formas claras, como por ejemplo una línea recta, ni zonas con una densidad marcadamente superior a la del resto. Lo que se observa es una "nube de puntos", que es lo que debería verse en una distribución uniforme.
# Solamente como demostración, hacemos un scatterplot 3D
g = LXM(42)
n = 3000
fig = plt.figure()
ax = plt.axes(projection='3d')
x = []; y = []; z = []
for i in range(n):
x.append(next(g))
y.append(next(g))
z.append(next(g))
ax.scatter3D(x, y, z);
# Histogramas
g = LXM(42)
# Regla de Struges
cantidades = [512, 2896, 52016]
intervalos = [0.1, 0.08, 0.06]
for cantidad, intervalo in zip(cantidades, intervalos):
muestras = []
for j in range(cantidad):
muestras.append(next(g))
fig, ax = plt.subplots()
sns.histplot(data=muestras, binwidth=intervalo, binrange=(0, 1))
ax.set_title(f"Histograma de muestras con intervalos de tamaño {intervalo}")
ax.set_ylabel("Cantidad de muestras")
plt.show()
En un histograma se muestra la cantidad de observaciones de la variable estudiada según el intervalo. Permite observar como se distribuyen los números generados. Por ejemplo, si las barras graficadas tuvieran diferencias significativas en sus alturas esto querría decir que la distribución no es uniforme. En los histogramas hechos, podemos ver que a medida que aumenta la cantidad de muestras usada, y consecuentemente se reduce el tamaño del intervalo, la diferencia que se observa entre las barras disminuye. Esto culmina en el último gráfico, que usa 52016 muestras e intervalos de tamaño 0.06, en el que la longitud de las barras es prácticamente igual. Por esta razón podemos decir que utilizando una cantidad de muestras suficientes (ej: 52016), las muestras tenderán a distribuirse uniformemente en los intervalos.
# Boxplot
g = LXM(42)
n = 10000
muestras = []
for j in range(n):
muestras.append(next(g))
g = sns.boxplot(muestras)
g.set_yticks([0, 0.25, 0.5, 0.75, 1])
plt.title("Boxplot")
Text(0.5, 1.0, 'Boxplot')
Un boxplot muestra los cuartiles de los números generados, el soporte de la distribución y los outliers. Para el caso actual, se puede utilizar para comparar esos datos con los de una distribución uniforme. Así, se puede ver que:
import numpy as np
import statsmodels.api as sm
from scipy.stats import uniform
g = LXM(42)
# QQPlot
cantidades = [100, 1000, 10000]
for cantidad in cantidades:
muestras = []
for j in range(cantidad):
muestras.append(next(g))
muestras_np = np.array(muestras)
fig, ax = plt.subplots()
sm.qqplot(muestras_np, dist=uniform, line='45', ax=ax)
ax.set_title(f"Comparación entre LXM y la distribución uniforme\n (usando {cantidad} muestras)")
ax.set_ylabel("Cuantiles de la muestra")
ax.set_xlabel("Cuantiles teóricos")
pylab.show()
En el QQplot se comparan los cuantiles de la variable a estudiar, en este caso los de los números generados con LXM, con los de una variable ideal, en este caso una distribución uniforme. Este gráfico permite observar si la probabilidad acumulada a izquierda de cada valor de la variable a estudiar se aleja o no del de la variable ideal. Se puede ver que si bien en el primer gráfico se observan claras diferencias entre los cuantiles teóricos y los de la muestra, estas pueden ser atribuidas a la baja cantidad de muestras tomadas para dicho gráfico. Lo anterior se debe a que dichas diferencias desaparecen al aumentar la cantidad de muestras en los gráficos siguientes. De hecho, se puede apreciar que en el gráfico hecho utilizando 1000 muestras, aproximadamente entre 0.1 y 0.4 los cuantiles de la muestra se encuentran apenas por debajo de los teóricos, pero esto se corrige en el gráfico hecho 10000 muestras. Por lo tanto, podemos decir que tomando una cantidad de muestras lo suficientemente grande (ej:10000), los cuantiles de los números generados y los de una distribución aleatoria no presentan diferencias.
Con este método comparamos dos distribuciones de probabilidad. Por una parte, usaremos el generador que implementamos, y por otra parte, para la distribución esperada.
Con este método comparamos dos distribuciones de probabilidad. Por una parte, la observada, que es la que obtenemos utilizando nuestro generador LXM. Por otra parte, la distribución esperada, que es la distribución que queremos corroborar que la observada sigue.
from scipy.stats import chi2
import numpy as np
import math
class Chi2:
def __init__(self, numero_de_muestras=1000, g = None):
self._generador = g or LXM(42)
self._numero_de_muestras = numero_de_muestras
def realizar_prueba(self):
frecuencias_observadas = self.frecuencias_observadas()
f_esperada = 1/10 * sum(frecuencias_observadas)
d2 = sum([(f_observada - f_esperada)**2 for f_observada in frecuencias_observadas]) / f_esperada
limite_superior = chi2.ppf(0.95, df=5)
print(f'Frecuencias: {frecuencias_observadas}')
print('Estadístico: {:.2f}'.format(d2))
print(f'Límite superior: {round(limite_superior, 2)}')
self.validar_hipotesis(d2, limite_superior)
def calcular_d2(self):
numerador = (f_observada - f_esperada) ** 2
denominador = f_esperada
d_al_cuadrado = numerador / denominados # acá es sumatoria
def validar_hipotesis(self, d2, limite_superior):
if d2 <= limite_superior:
print("El test acepta la hipotesis nula")
else:
print("El test rechaza la hipótesis nula")
def frecuencias_observadas(self):
# Estas son las que tengo que obtener
numeros_generados = []
for i in range(self._numero_de_muestras):
numeros_generados.append(next(self._generador))
frecuencias = [0] * 10
for numero in numeros_generados:
pos = math.floor(numero*10)
if pos < 10:
frecuencias[pos] += 1
return frecuencias
def pseudo_chi2():
# Obtengo freuencias observadas de 0 a 9 (enteros)
# La frecuencia esperada es 1/10 por la cantidad de números aleatorios que genero
# Con estos valores ahora entonces calculo d2
# d2 = sum (frecuencia_observada - frecuencia_esperada) ** 2 / frecuencia_esperada
# Finalmente para chi2.ppf(0.95, df=5) vemos si d2 es mayor o menos al limite superior
return
# Corremos nuestro experimento
Chi2(numero_de_muestras=10000).realizar_prueba()
Frecuencias: [949, 998, 1019, 1007, 1008, 998, 1029, 968, 1001, 1023] Estadístico: 5.48 Límite superior: 11.07 El test acepta la hipotesis nula
Este test consiste en contar la cantidad de números aleatorios generados que no pertenecen a un intervalo dado, tal que consigamos 'gaps'. Para este test decidimos generar 1000 gaps y utilizar el intervalo [0.2, 0,5].
Finalmente aplicamos chi2 para validar (o rechazar) la hipótesis.
import matplotlib.pyplot as plt
class Gap:
def __init__(self, cota_inferior, cota_superior, g = None):
self._generador = g or LXM(42)
self._cota_inferior = cota_inferior
self._cota_superior = cota_superior
self._p_0 = cota_superior - cota_inferior
self._cantidad_de_gaps = 1000
def realizar_prueba(self):
# Espero a la primera vez que tengo valor dentro de la cota
gaps = self.generar_gaps()
self.plotear_gaps(gaps)
frecuencias_observadas = self.generar_frecuencias(gaps)
d2 = 0
# Aplicamos chi2
for i, frecuencia_observada in enumerate(frecuencias_observadas):
f_esperada = self.frecuencia_esperada(i)
numerador = (frecuencia_observada - f_esperada) ** 2
denominador = f_esperada
d2 += numerador / denominador
# Validamos hipótesis
k = len(frecuencias_observadas)
limite_superior = chi2.ppf(0.95, df=k-1)
print('Estadístico: {:.2f}'.format(d2))
print(f'Límite superior: {round(limite_superior, 2)}')
self.validar_hipotesis(d2, limite_superior)
def validar_hipotesis(self, d2, limite_superior):
if d2 <= limite_superior:
print("El test acepta la hipotesis nula")
else:
print("El test rechaza la hipótesis nula")
def generar_gaps(self):
gaps = []
# Generamos 1000 gaps
while(len(gaps) < self._cantidad_de_gaps):
while (True):
n = next(self._generador)
if n < self._cota_superior and n > self._cota_inferior:
break
gap = 0
while(True):
n = next(self._generador)
if n < self._cota_superior and n > self._cota_inferior:
gaps.append(gap)
break
else:
gap +=1
return gaps
def plotear_gaps(self, gaps):
plt.hist(gaps, color='b', bins="sturges", alpha=0.8, density=True, rwidth=0.8, align='left')
plt.show()
def generar_frecuencias(self, gaps):
frecuencias = [0] * (max(gaps) + 1)
for gap in gaps:
frecuencias[gap] += 1
return frecuencias
def frecuencia_esperada(self, n):
return (((1 - self._p_0)**n) * self._p_0) * self._cantidad_de_gaps
def pseudo_gap():
# Obtengo gaps entre un rango de valores
# (dentro de [0,1] ya que son los valores que genera el LXM programado
# Con estos valores ahora entonces calculo d2
# d2 = sum (frecuencia_observada - frecuencia_esperada) ** 2 / frecuencia_esperada
# La frecuencia esperada podrá ser distinta para cada valor, ya que
# la misma sigue una distribución geométrica
# Finalmente para chi2.ppf(0.95, df=cantidad_de_gaps) vemos si d2 es mayor o menos al limite superior
return
# Corremos nuestro experimento
Gap(cota_inferior=0.2, cota_superior=0.5).realizar_prueba()
Estadístico: 21.92 Límite superior: 31.41 El test acepta la hipotesis nula
Con este métod vamos a validar que las variables sean independientes.
from scipy.stats import chi2_contingency
class TestIndependencia:
def __init__(self, g = None):
self._generador = g or LXM(42)
self._cantidad_de_vectores = 500
def realizar_prueba(self):
nros_x = []
nros_y = []
for i in range(self._cantidad_de_vectores):
nros_x.append(next(self._generador))
nros_y.append(next(self._generador))
self.plot_vectores(nros_x, nros_y)
d2, p, dof, ex = chi2_contingency([nros_x, nros_y], correction=True)
# Validamos hipótesis
limite_superior = chi2.ppf(0.95, df=dof)
print('Estadístico: {:.2f}'.format(d2))
print(f'Límite superior: {round(limite_superior, 2)}')
self.validar_hipotesis(d2, limite_superior)
def validar_hipotesis(self, d2, limite_superior):
if d2 <= limite_superior:
print("El test acepta la hipotesis nula")
else:
print("El test rechaza la hipótesis nula")
def plot_vectores(self, nros_x, nros_y):
plt.title(f"Dispersión de {self._cantidad_de_vectores} números generados al azar")
plt.ylabel("X(n)-1")
plt.xlabel("X(n)")
plt.scatter(nros_x,nros_y,50)
plt.show()
# Corremos nuestro experimento
TestIndependencia().realizar_prueba()
Estadístico: 87.60 Límite superior: 552.07 El test acepta la hipotesis nula
gen = LXM(42)
def generar_exponencial(lambda_):
u_1 = next(gen)
return -np.log(u_1)/lambda_
def cociente(t):
return e**((-(t-1)**2)/2)
def transformar_gaussiana(x, mean, std):
return x*std + mean
def generar_normal_0_1():
no_aceptada = True
while(no_aceptada):
exp_1 = generar_exponencial(1)
prob = cociente(exp_1)
if(next(gen) < prob):
no_aceptada = False
z = exp_1
if(next(gen) < 0.5):
z = -z
return z
m1, m2, m3 = [], [], [] #muestras
for i in range(100):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m1.append(z2)
for i in range(1000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m2.append(z2)
for i in range(10000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m3.append(z2)
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(20, 5))
fig.tight_layout(pad=3)
ax1.hist(m1, 50)
ax1.set_xlabel('Valores generados', fontsize=15)
ax1.set_title("Histograma (n = 100)", fontsize=20)
ax2.hist(m2, 50)
ax2.set_xlabel('Valores generados', fontsize=15)
ax2.set_title("Histograma (n = 1000)", fontsize=20)
ax3.hist(m3, 50)
ax3.set_xlabel('Valores generados', fontsize=15)
ax3.set_title("Histograma (n = 10000)", fontsize=20)
plt.show();
El test de Kolmogorov-Smirnov se utiliza para decidir si una muestra proviene de una población con una distribución específica.
Se tiene una distribución de probabilidad acumulada, la cual se propone como distribución teórica de la muestra, y otra distribución de probabilidad empirica, generada a partir de la muestra.
En este caso tenemos las hipótesis:
m1_ks, m2_ks, m3_ks = [], [], [] #muestras
for i in range(100):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m1_ks.append(z2)
for i in range(1000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m2_ks.append(z2)
for i in range(5000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m3_ks.append(z2)
figsts, (ax1sts, ax2sts, ax3sts) = plt.subplots(1, 3, figsize=(10, 3))
figsts.tight_layout(pad=3)
state = np.random.default_rng()
dist_teorica_1 = sp.stats.norm(loc=10, scale=2).rvs(size=100, random_state=state)
dist_teorica_2 = sp.stats.norm(loc=10, scale=2).rvs(size=1000, random_state=state)
dist_teorica_3 = sp.stats.norm(loc=10, scale=2).rvs(size=5000, random_state=state)
yval1 = np.linspace(0.0, 1.0,num=100)
ax1sts.step(x=np.sort(m1_ks), y=yval1)
ax1sts.step(x=np.sort(dist_teorica_1), y=yval1)
ax1sts.set_xlabel('Valores generados (n=100)', fontsize=8)
ax1sts.set_title("Probabilidad Acumulada", fontsize=10)
yval2 = np.linspace(0.0, 1.0,num=1000)
ax2sts.step(x=np.sort(m2_ks), y=yval2)
ax2sts.step(x=np.sort(dist_teorica_2), y=yval2)
ax2sts.set_xlabel('Valores generados (n=1000)', fontsize=8)
ax2sts.set_title("Probabilidad Acumulada", fontsize=10)
yval3 = np.linspace(0.0, 1.0,num=5000)
ax3sts.step(x=np.sort(m3_ks), y=yval3)
ax3sts.step(x=np.sort(dist_teorica_3), y=yval3)
ax3sts.set_xlabel('Valores generados (n=5000)', fontsize=8)
ax3sts.set_title("Probabilidad Acumulada", fontsize=10)
plt.show();
figks, (ax1ks, ax2ks, ax3ks) = plt.subplots(1, 3, figsize=(20, 5))
figks.tight_layout(pad=3)
ax1ks.hist(m1_ks, 10)
ax1ks.set_xlabel('Valores generados', fontsize=15)
ax1ks.set_title("Histograma (n = 100)", fontsize=20)
ax2ks.hist(m2_ks, 25)
ax2ks.set_xlabel('Valores generados', fontsize=15)
ax2ks.set_title("Histograma (n = 1000)", fontsize=20)
ax3ks.hist(m3_ks, 50)
ax3ks.set_xlabel('Valores generados', fontsize=15)
ax3ks.set_title("Histograma (n = 5000)", fontsize=20)
plt.show();
from scipy import stats
ks1 = stats.kstest(m1_ks, dist_teorica_1, alternative='two-sided')
ks2 = stats.kstest(m2_ks, dist_teorica_2, alternative='two-sided')
ks3 = stats.kstest(m3_ks, dist_teorica_3, alternative='two-sided')
display ("Valor del estadístico del test Kolmogorov-Smirnov con n = 100: " + str(ks1.statistic))
display ("P-valor del test Kolmogorov-Smirnov con n = 100: " + str(ks1.pvalue))
display ("Valor del estadístico del test Kolmogorov-Smirnov con n = 1000: " + str(ks2.statistic))
display ("P-valor del test Kolmogorov-Smirnov con n = 1000: " + str(ks2.pvalue))
display ("Valor del estadístico del test Kolmogorov-Smirnov con n = 5000: " + str(ks3.statistic))
display ("P-valor del test Kolmogorov-Smirnov con n = 5000: " + str(ks3.pvalue))
'Valor del estadístico del test Kolmogorov-Smirnov con n = 100: 0.1'
'P-valor del test Kolmogorov-Smirnov con n = 100: 0.7020569828664881'
'Valor del estadístico del test Kolmogorov-Smirnov con n = 1000: 0.054'
'P-valor del test Kolmogorov-Smirnov con n = 1000: 0.1082872208757189'
'Valor del estadístico del test Kolmogorov-Smirnov con n = 5000: 0.0192'
'P-valor del test Kolmogorov-Smirnov con n = 5000: 0.3153878109541569'
Dado que en todos los casos el p-valor es superior a 0.05, no rechazamos la hipótesis de que los datos provienen de una distribución normal.
El test de Shapiro-Wilk está diseñado para decidir si una muestra aleatoria proviene de una distribución normal. En este caso se cuenta con las siguientes hipótesis:
$H_0$: Los datos provienen de una distribución normal
$H_1$: Los datos no provienen de una distribución normal
El test provee como resultados un estadístico y un p-valor. Cuando el p-valor obtenido es superior a cierto umbral --usualmente 0.05--, decimos que no podemos rechazar la hipótesis de que los datos provienen de una distribución normal.
La implementación utilizada (scipy) indica que para muestras de más de 5000 números, se pierde precisión en los valores del test. Por lo que se realizará el test para muestras de tamaños N = 100, 1000 y 5000. (Ver: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.shapiro.html?highlight=shapiro)
m1_sw, m2_sw, m3_sw = [], [], [] #muestras
for i in range(100):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m1_sw.append(z2)
for i in range(1000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m2_sw.append(z2)
for i in range(5000):
z1 = generar_normal_0_1()
z2 = transformar_gaussiana(z1, 10, 2)
m3_sw.append(z2)
figsw, (ax1sw, ax2sw, ax3sw) = plt.subplots(1, 3, figsize=(20, 5))
figsw.tight_layout(pad=3)
ax1sw.hist(m1_sw, 10)
ax1sw.set_xlabel('Valores generados', fontsize=15)
ax1sw.set_title("Histograma (n = 100)", fontsize=20)
ax2sw.hist(m2_sw, 25)
ax2sw.set_xlabel('Valores generados', fontsize=15)
ax2sw.set_title("Histograma (n = 1000)", fontsize=20)
ax3sw.hist(m3_sw, 50)
ax3sw.set_xlabel('Valores generados', fontsize=15)
ax3sw.set_title("Histograma (n = 5000)", fontsize=20)
plt.show();
from scipy import stats
shapiro1 = stats.shapiro(m1_sw)
shapiro2 = stats.shapiro(m2_sw)
shapiro3 = stats.shapiro(m3_sw)
display ("Valor del estadístico del test Shapiro-Wilk con n = 100: " + str(shapiro1.statistic))
display ("P-valor del test Shapiro-Wilk con n = 100: " + str(shapiro1.pvalue))
display ("Valor del estadístico del test Shapiro-Wilk con n = 1000: " + str(shapiro2.statistic))
display ("P-valor del test Shapiro-Wilk con n = 1000: " + str(shapiro2.pvalue))
display ("Valor del estadístico del test Shapiro-Wilk con n = 5000: " + str(shapiro3.statistic))
display ("P-valor del test Shapiro-Wilk con n = 5000: " + str(shapiro3.pvalue))
'Valor del estadístico del test Shapiro-Wilk con n = 100: 0.9904636740684509'
'P-valor del test Shapiro-Wilk con n = 100: 0.7020654678344727'
'Valor del estadístico del test Shapiro-Wilk con n = 1000: 0.9989786744117737'
'P-valor del test Shapiro-Wilk con n = 1000: 0.8619867563247681'
'Valor del estadístico del test Shapiro-Wilk con n = 5000: 0.9995430111885071'
'P-valor del test Shapiro-Wilk con n = 5000: 0.28750666975975037'
Dado que el p-valor es superior a 0.05, no rechazamos la hipótesis de que los datos provienen de una distribución normal.
arribos_real = []
with open("tiempos_entre_arribos.txt") as f:
for l in f.readlines(): arribos_real.append(float(l.strip()))
tasa = len(arribos_real)/sum(arribos_real)
print(f"Tengo {len(arribos_real)} arribos en {sum(arribos_real)} horas")
print(f"Mi tasa de arribos empírica es de {tasa} arribos por hora")
Tengo 10000 arribos en 1011.1994386169154 horas Mi tasa de arribos empírica es de 9.889245996494681 arribos por hora
# Para generar eventos de Poisson:
# 1. Generar muestras de una distribución exponencial Z, con parametro Lambda: z1, z2, z3...
# 2. Definir tiempos de eventos conteo de Poisson: t0 = 0, t1 = t0 + z1, t2 = t1 + z2....
def iter_exponencial(lmbda, generador, n):
"""Genera una secuencia que siga una distribución exponencial,
dado un generador que sigue una distribución uniforme"""
u = np.array([next(generador) for i in range(int(n))])
x = -np.log(1-u)/lmbda
return iter(x)
def generar_poisson(limite, lmbda, generador):
expo = iter_exponencial(lmbda, generador, lmbda*limite*1.5)
arribos = []
t = 0
while t <= limite:
arribo = next(expo)
arribos.append(arribo)
t += arribo
return arribos
limite = 24 * 31 # Un mes
arribos = generar_poisson(limite, tasa, LXM(42))
print(f"Simulé {len(arribos)} arribos en {sum(arribos)} horas")
print(f"Mi tasa de arribos simulada es de {len(arribos)/sum(arribos)} arribos por hora")
Simulé 7300 arribos en 744.064522714941 horas Mi tasa de arribos simulada es de 9.810977109033201 arribos por hora
def comparar_poissones(poissones_con_label, n = 15):
for arribos, label in poissones_con_label:
plt.step(np.cumsum(arribos[:n]),range(n),where= 'post',label=label)
plt.legend()
plt.title(f"Comparación de los primeros {n} arribos")
plt.show()
comparar_poissones([(arribos_real, "Poisson Empirica"), (arribos, "Poisson Simulada")])
# Hacemos un test de Kolmogorov Smirnov para confirmar que nuestros arribos simulados
# siguen una distribución exponencial
random_exponencial = np.random.exponential(1/tasa, 7300)
statistic_ks, pvalue_ks = stats.kstest(random_exponencial, arribos)
print("KS = %f, p = %f" % (statistic_ks, pvalue_ks))
print("Se rechaza H0" if pvalue_ks<0.05 else "No existe evidencia para rechazar H0")
KS = 0.009726, p = 0.880296 No existe evidencia para rechazar H0
limite = 96
poissones = []
for i in range(1000):
poissones.append(generar_poisson(limite, tasa, LXM(i)))
comparar_poissones([(p, f"Poisson Generada #{i}") for i, p in enumerate(poissones[:5])])
import math
# PMF de poisson, a mano, para comparar con los valores teóricos
pmf = lambda x, lmbda: ((lmbda ** x) * (math.e ** -lmbda)) / (math.factorial(x))
print("Probabilidad que el primer vehículo arribe antes de los 10 minutos.")
diez_min = 1/6
# La probabilidad de que haya al menos un arribo en 10 minutos equivale a
# 1 - la probabilidad de que no haya ningun arribo en 10 minutos
print(f"* Probabilidad Teórica => {1 - pmf(0, tasa*diez_min)}")
favorable = []
for p in poissones:
favorable.append(p[0] <= diez_min)
print(f"* Probabilidad Simulada => {sum(favorable)/len(favorable)}")
Probabilidad que el primer vehículo arribe antes de los 10 minutos. * Probabilidad Teórica => 0.8076055651533732 * Probabilidad Simulada => 0.818
print("Probabilidad que el undécimo vehículo arribe después de los 60 minutos.")
sesenta_min = 1
# La probabilidad de que el vehiculo 11 llegué despues de 60 minutos equivale a
# la sumatoria de que hayan llegado 0,1,2,...,10 vehiculos en 60 minutos
# Ojo con los off-by-one! `for i in range(11)` === de 0 a 10
print(f"* Probabilidad Teórica => {sum([pmf(i, tasa*sesenta_min) for i in range(11)])}")
favorable = []
for p in poissones:
arribos_acumulados = np.cumsum(p)
# El arribo en el indice [10] === El undecimo arribo
favorable.append(arribos_acumulados[10] >= 1)
print(f"* Probabilidad Simulada => {sum(favorable)/len(favorable)}")
Probabilidad que el undécimo vehículo arribe después de los 60 minutos. * Probabilidad Teórica => 0.596893339441658 * Probabilidad Simulada => 0.579
# Como nuestra pmf a mano da overflow al calcular un número grande como lambda a la 750,
# importamos directamente el modulo de scipy
from scipy.stats import poisson
print("Probabilidad que arriben al menos 750 vehículos antes de las 72 horas.")
setentidos_horas = 72
# La probabilidad de que arriben al menos 750 vehiculos en 72 horas equivale a
# 1 - la sumatoria de que lleguen 0,1,...,749 en 72 horas
print(f"* Probabilidad Teórica => {1 - sum([poisson.pmf(i, tasa*setentidos_horas) for i in range(750)])}")
favorable = []
for p in poissones:
arribos_acumulados = np.cumsum(p)
favorable.append(arribos_acumulados[749] <= 72)
print(f"* Probabilidad Simulada => {sum(favorable)/len(favorable)}")
Probabilidad que arriben al menos 750 vehículos antes de las 72 horas. * Probabilidad Teórica => 0.0809784853589276 * Probabilidad Simulada => 0.084
class EJ5:
def __init__(self, gdf, lxm):
self.gdf = gdf.copy()
self.lxm = lxm
self.puntos_en_barrios = {}
for barrio in self.gdf['BARRIO'].tolist():
self.puntos_en_barrios[barrio] = [[],[]]
def emitir_barrios(self):
display(self.gdf['BARRIO'])
#metodo auxiliar para trasladar un barrio al primer cuadrante
def trasladar_barrio(self, datos_barrio):
extremos_barrio = datos_barrio.geometry.bounds
signo1 = 1
signo2 = 1
if(extremos_barrio.iloc[0]['minx'] < 0):
signo1 = -1
if(extremos_barrio.iloc[0]['miny'] < 0):
signo2 = -1
return datos_barrio.translate(extremos_barrio.iloc[0]['minx']*signo1, extremos_barrio.iloc[0]['miny']*signo2)
#Generar n puntos en un barrio
def generar_en_barrio(self, barrio, cantidad):
x, y = [], []
#obtengo la geometria del barrio, la traslado al primer cuadrante
#y busco el maximo entre sus extremos para formar un cuadrado que los contenga
datos_barrio = self.gdf[self.gdf['BARRIO']==barrio]
g_barrio = datos_barrio['geometry']
g_barrio_t = self.trasladar_barrio(datos_barrio)
extremos_b = datos_barrio.bounds.to_numpy()
extremos_t = g_barrio_t.bounds.to_numpy()
maximo_t = np.amax(extremos_t)
#genero puntos dentro del cuadrado que obtuve, y los acepto si están dentro del barrio
x = []
y = []
while len(x) < cantidad:
z1 = next(self.lxm) * maximo_t
z2 = next(self.lxm) * maximo_t
ps = shapely.geometry.Point(z1, z2)
if g_barrio_t.contains(ps).any():
x.append(z1 + datos_barrio.geometry.bounds.minx)
y.append(z2 + datos_barrio.geometry.bounds.miny)
self.puntos_en_barrios[barrio] = [x, y]
#Generar n puntos en todos los barrios
def generar_en_todos(self, n):
for barrio in self.gdf['BARRIO'].tolist():
self.generar_en_barrio(barrio, n)
#Graficar un barrio y los puntos generados en el mismo
def graficar_en_barrio(self, barrio):
x = self.puntos_en_barrios[barrio][0]
y = self.puntos_en_barrios[barrio][1]
title_string = "Puntos generados en el barrio de {} (n = {})".format(barrio, len(x))
fig, ax = plt.subplots(figsize=(10, 10))
fig.tight_layout(pad=3)
ax.set_title(title_string, fontsize=20)
datos_barrio = self.gdf.loc[self.gdf['BARRIO']==barrio]
datos_barrio.plot(ax=ax, color='white', edgecolor='black')
ax.scatter(x, y, s=5, color="red")
ax.grid()
return fig, ax
#Graficar el mapa completo y los puntos generados en un barrio
def graficar_mapa_completo_barrio(self, nombre_barrio):
x = self.puntos_en_barrios[nombre_barrio][0]
y = self.puntos_en_barrios[nombre_barrio][1]
title_string = "Puntos generados en el barrio de {} (n = {})".format(nombre_barrio, len(x))
fig, ax = plt.subplots(figsize=(15, 15))
fig.tight_layout(pad=3)
ax.set_title(title_string, fontsize=20)
self.gdf.plot(ax=ax, color='white', edgecolor='black')
ax.scatter(x, y, s=5, color="red")
ax.grid()
return fig, ax
#Graficar el mapa completo y los puntos generados en todos los barrios (si hay)
def graficar_mapa_completo_todos(self):
title_string = "Puntos generados en todos los barrios."
fig, ax = plt.subplots(figsize=(15, 15))
fig.tight_layout(pad=3)
ax.set_title(title_string, fontsize=20)
self.gdf.plot(ax=ax, color='white', edgecolor='black')
for b in self.gdf['BARRIO'].tolist():
x = self.puntos_en_barrios[b][0]
y = self.puntos_en_barrios[b][1]
ax.scatter(x, y, s=5, color="red")
ax.grid()
return fig, ax
def help(self):
display("generar_en_barrio('barrio', cantidad)")
display("graficar_mapa_completo_barrio('barrio')")
display("graficar_mapa_completo_todos()")
display("graficar_en_barrio('barrio')")
display("generar_en_todos(cantidad)")
ej5test = EJ5(geodataframe, LXM(42))
ej5test.generar_en_barrio('PUERTO MADERO', 100)
ej5test.generar_en_todos(100)
ej5test.graficar_en_barrio('PUERTO MADERO')
ej5test.graficar_mapa_completo_barrio('PUERTO MADERO')
ej5test.graficar_mapa_completo_todos()
(<Figure size 1500x1500 with 1 Axes>,
<AxesSubplot: title={'center': 'Puntos generados en todos los barrios.'}>)
# Ahora, rehagamos partes del trabajo, pero basados en un GCL en vez de en LXM
# Son los resultados distintos?
padrones = [97568, 100029, 100033, 104030]
semilla = int(np.mean(padrones))
def GCL(x, a = 1013904223, c = 1664525, m = 232):
while True:
x = ((a * x + c) % m) / m
yield x
n = 5
g = GCL(semilla)
print(f"{n} números generados al azar, en el intervalo 0-1:")
for r, i in zip(g,range(n)):
print(f"* {r}")
5 números generados al azar, en el intervalo 0-1: * 0.9224137931034483 * 0.29819411399035617 * 0.5019334719098848 * 0.895838293535956 * 0.008367325211393422
# Empezamos con un scatterplot
g = GCL(semilla)
n = 1000
x = []; y = []
for i in range(n):
x.append(next(g))
y.append(next(g))
plt.title(f"Dispersión de {n} números generados al azar")
plt.ylabel("X(n)-1")
plt.xlabel("X(n)")
plt.scatter(x,y,50)
<matplotlib.collections.PathCollection at 0x7fae166534f0>
Chi2(numero_de_muestras=10000, g=GCL(semilla)).realizar_prueba()
Frecuencias: [1005, 996, 1013, 1018, 1002, 992, 987, 1020, 994, 973] Estadístico: 1.94 Límite superior: 11.07 El test acepta la hipotesis nula
limite = 24 * 31 # Un mes
arribos_gcl = generar_poisson(limite, tasa, GCL(semilla))
print(f"Simulé {len(arribos_gcl)} arribos en {sum(arribos_gcl)} horas")
print(f"Mi tasa de arribos simulada es de {len(arribos_gcl)/sum(arribos_gcl)} arribos por hora")
comparar_poissones([(arribos_real, "Poisson Empirica"), (arribos, "Poisson Simulada con LXM"), (arribos_gcl, "Poisson Simulada con GCL")])
Simulé 7394 arribos en 744.0969199691178 horas Mi tasa de arribos simulada es de 9.936877578134409 arribos por hora
ej5test = EJ5(geodataframe, GCL(semilla))
ej5test.generar_en_barrio('PUERTO MADERO', 100)
ej5test.generar_en_todos(100)
ej5test.graficar_en_barrio('PUERTO MADERO')
ej5test.graficar_mapa_completo_barrio('PUERTO MADERO')
ej5test.graficar_mapa_completo_todos()
(<Figure size 1500x1500 with 1 Axes>,
<AxesSubplot: title={'center': 'Puntos generados en todos los barrios.'}>)
Conclusión: Los resultados con LXM y con el GCL a lo largo del TP nos parecen bastante similares, lo cual nos parece que tiene muchísimo sentido! LXM se basa fuertemente en un GCL.